/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.demos;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;

import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.util.Arrays;

public class CoordinationDemo {
	static int CHANNEL_COUNT = 5;
	static int SCREEN_WIDTH = 120;
	static long SLEEP_TIME = 10;
	static int CLEAR_SCREEN = 30;
	static boolean MULTI_THREAD = false;
	static boolean[] VIEW_EVENTS = new boolean[255];
	StringBuffer statusLine = new StringBuffer();
	Status[] status = null;
	BufferedReader reader = null;

	/**
	 * Construct and show the application.
	 */
	public CoordinationDemo() {
	}

	public void init() {
		reader = new BufferedReader(new InputStreamReader(System.in));
		status = new Status[CHANNEL_COUNT];
	}

	public void clearScreen() {
		StringBuffer buf = new StringBuffer(700);
		for (int i = 0; i < CLEAR_SCREEN; i++)
			buf.append("\n");
		System.out.println(buf);
	}

	public void printMenuOptions() {
		System.out.println("Commands:");
		System.out.println("\tstart [member id]");
		System.out.println("\tstop  [member id]");
		System.out.println("\tprint (refresh)");
		System.out.println("\tquit");
		System.out.print("Enter command:");
	}

	public synchronized void printScreen() {
		clearScreen();
		System.out.println(" ###." + getHeader());
		for (int i = 0; i < status.length; i++) {
			System.out.print(leftfill(String.valueOf(i + 1) + ".", 5, " "));
			if (status[i] != null)
				System.out.print(status[i].getStatusLine());
		}
		System.out.println("\n\n");
		System.out.println("Overall status:" + statusLine);
		printMenuOptions();

	}

	public String getHeader() {
		// member - 30
		// running- 10
		// coord - 30
		// view-id - 24
		// view count - 8

		StringBuffer buf = new StringBuffer();
		buf.append(leftfill("Member", 30, " "));
		buf.append(leftfill("Running", 10, " "));
		buf.append(leftfill("Coord", 30, " "));
		buf.append(leftfill("View-id(short)", 24, " "));
		buf.append(leftfill("Count", 8, " "));
		buf.append("\n");

		buf.append(rightfill(
				"==="
						+ new java.sql.Timestamp(System.currentTimeMillis())
								.toString(), SCREEN_WIDTH, "="));
		buf.append("\n");
		return buf.toString();
	}

	public String[] tokenize(String line) {
		StringTokenizer tz = new StringTokenizer(line, " ");
		String[] result = new String[tz.countTokens()];
		for (int i = 0; i < result.length; i++)
			result[i] = tz.nextToken();
		return result;
	}

	public void waitForInput() throws IOException {
		for (int i = 0; i < status.length; i++)
			status[i] = new Status(this);
		printScreen();
		String l = reader.readLine();
		String[] args = tokenize(l);
		while (args.length >= 1 && (!"quit".equalsIgnoreCase(args[0]))) {
			if ("start".equalsIgnoreCase(args[0])) {
				cmdStart(args);
			} else if ("stop".equalsIgnoreCase(args[0])) {
				cmdStop(args);

			}
			printScreen();
			l = reader.readLine();
			args = tokenize(l);
		}
		for (int i = 0; i < status.length; i++)
			status[i].stop();
	}

	private void cmdStop(String[] args) {
		if (args.length == 1) {
			setSystemStatus("System shutting down...");
			Thread[] t = new Thread[CHANNEL_COUNT];
			for (int i = 0; i < status.length; i++) {
				final int j = i;
				t[j] = new Thread() {
					public void run() {
						status[j].stop();
					}
				};
			}
			for (int i = 0; i < status.length; i++)
				if (MULTI_THREAD)
					t[i].start();
				else
					t[i].run();
			setSystemStatus("System stopped.");
		} else {
			int index = -1;
			try {
				index = Integer.parseInt(args[1]) - 1;
			} catch (Exception x) {
				setSystemStatus("Invalid index:" + args[1]);
			}
			if (index >= 0) {
				setSystemStatus("Stopping member:" + (index + 1));
				status[index].stop();
				setSystemStatus("Member stopped:" + (index + 1));
			}
		}
	}

	private void cmdStart(String[] args) {
		if (args.length == 1) {
			setSystemStatus("System starting up...");
			Thread[] t = new Thread[CHANNEL_COUNT];
			for (int i = 0; i < status.length; i++) {
				final int j = i;
				t[j] = new Thread() {
					public void run() {
						status[j].start();
					}
				};
			}
			for (int i = 0; i < status.length; i++)
				if (MULTI_THREAD)
					t[i].start();
				else
					t[i].run();
			setSystemStatus("System started.");
		} else {
			int index = -1;
			try {
				index = Integer.parseInt(args[1]) - 1;
			} catch (Exception x) {
				setSystemStatus("Invalid index:" + args[1]);
			}
			if (index >= 0) {
				setSystemStatus("Starting member:" + (index + 1));
				status[index].start();
				setSystemStatus("Member started:" + (index + 1));
			}
		}
	}

	public void setSystemStatus(String status) {
		statusLine.delete(0, statusLine.length());
		statusLine.append(status);
	}

	public static void setEvents(String events) {
		java.util.Arrays.fill(VIEW_EVENTS, false);
		StringTokenizer t = new StringTokenizer(events, ",");
		while (t.hasMoreTokens()) {
			int idx = Integer.parseInt(t.nextToken());
			VIEW_EVENTS[idx] = true;
		}
	}

	public static void run(String[] args, CoordinationDemo demo)
			throws Exception {
		usage();
		java.util.Arrays.fill(VIEW_EVENTS, true);

		for (int i = 0; i < args.length; i++) {
			if ("-c".equals(args[i]))
				CHANNEL_COUNT = Integer.parseInt(args[++i]);
			else if ("-t".equals(args[i]))
				MULTI_THREAD = Boolean.parseBoolean(args[++i]);
			else if ("-s".equals(args[i]))
				SLEEP_TIME = Long.parseLong(args[++i]);
			else if ("-sc".equals(args[i]))
				CLEAR_SCREEN = Integer.parseInt(args[++i]);
			else if ("-p".equals(args[i]))
				setEvents(args[++i]);
			else if ("-h".equals(args[i]))
				System.exit(0);
		}
		demo.init();
		demo.waitForInput();
	}

	private static void usage() {
		System.out.println("Usage:");
		System.out
				.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p view_events_csv(1,2,5,7)");
		System.out.println("Example:");
		System.out
				.println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo single threaded start/stop with 5 channels");
		System.out
				.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels");
		System.out
				.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen");
		System.out
				.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event");
		System.out.println();
	}

	public static void main(String[] args) throws Exception {
		CoordinationDemo demo = new CoordinationDemo();
		run(args, demo);
	}

	public static String leftfill(String value, int length, String ch) {
		return fill(value, length, ch, true);
	}

	public static String rightfill(String value, int length, String ch) {
		return fill(value, length, ch, false);
	}

	public static String fill(String value, int length, String ch, boolean left) {
		StringBuffer buf = new StringBuffer();
		if (!left)
			buf.append(value.trim());
		for (int i = value.trim().length(); i < length; i++)
			buf.append(ch);
		if (left)
			buf.append(value.trim());
		return buf.toString();
	}

	public static class Status {
		public CoordinationDemo parent;
		public GroupChannel channel;
		NonBlockingCoordinator interceptor = null;
		public String status;
		public Exception error;
		public String startstatus = "new";

		public Status(CoordinationDemo parent) {
			this.parent = parent;
		}

		public String getStatusLine() {
			// member - 30
			// running- 10
			// coord - 30
			// view-id - 24
			// view count - 8
			StringBuffer buf = new StringBuffer();
			String local = "";
			String coord = "";
			String viewId = "";
			String count = "0";
			if (channel != null) {
				Member lm = channel.getLocalMember(false);
				local = lm != null ? lm.getName() : "";
				coord = interceptor != null
						&& interceptor.getCoordinator() != null ? interceptor
						.getCoordinator().getName() : "";
				viewId = getByteString(interceptor.getViewId() != null ? interceptor
						.getViewId().getBytes()
						: new byte[0]);
				count = String.valueOf(interceptor.getView().length);
			}
			buf.append(leftfill(local, 30, " "));
			buf.append(leftfill(startstatus, 10, " "));
			buf.append(leftfill(coord, 30, " "));
			buf.append(leftfill(viewId, 24, " "));
			buf.append(leftfill(count, 8, " "));
			buf.append("\n");
			buf.append("Status:" + status);
			buf.append("\n");
			return buf.toString();
		}

		public String getByteString(byte[] b) {
			if (b == null)
				return "{}";
			return Arrays.toString(b, 0, Math.min(b.length, 4));
		}

		public void start() {
			try {
				if (channel == null) {
					channel = createChannel();
					startstatus = "starting";
					channel.start(channel.DEFAULT);
					startstatus = "running";
				} else {
					status = "Channel already started.";
				}
			} catch (Exception x) {
				synchronized (System.err) {
					System.err.println("Start failed:");
					StackTraceElement[] els = x.getStackTrace();
					for (int i = 0; i < els.length; i++)
						System.err.println(els[i].toString());
				}
				status = "Start failed:" + x.getMessage();
				error = x;
				startstatus = "failed";
				try {
					channel.stop(GroupChannel.DEFAULT);
				} catch (Exception ignore) {
				}
				channel = null;
				interceptor = null;
			}
		}

		public void stop() {
			try {
				if (channel != null) {
					channel.stop(channel.DEFAULT);
					status = "Channel Stopped";
				} else {
					status = "Channel Already Stopped";
				}
			} catch (Exception x) {
				synchronized (System.err) {
					System.err.println("Stop failed:");
					StackTraceElement[] els = x.getStackTrace();
					for (int i = 0; i < els.length; i++)
						System.err.println(els[i].toString());
				}

				status = "Stop failed:" + x.getMessage();
				error = x;
			} finally {
				startstatus = "stopped";
				channel = null;
				interceptor = null;
			}
		}

		public GroupChannel createChannel() {
			channel = new GroupChannel();
			((ReceiverBase) channel.getChannelReceiver()).setAutoBind(100);
			interceptor = new NonBlockingCoordinator() {
				public void fireInterceptorEvent(InterceptorEvent event) {
					status = event.getEventTypeDesc();
					int type = event.getEventType();
					boolean display = VIEW_EVENTS[type];
					if (display)
						parent.printScreen();
					try {
						Thread.sleep(SLEEP_TIME);
					} catch (Exception x) {
					}
				}
			};
			channel.addInterceptor(interceptor);
			channel.addInterceptor(new TcpFailureDetector());
			channel.addInterceptor(new MessageDispatch15Interceptor());
			return channel;
		}
	}
}